## NOTES:
## Example Airflow DAG for running a unit test job
## against a feature group in the Feature Store and
## fetch the validation result. If the validation was
## unsuccessful then it will fail the rest of the pipeline
##
## It assumes that you have already populated your Feature Store
## and composed some validation rules. You can run the
## Feature Store tour and follow the instructions on the
## documentation
## https://hopsworks.readthedocs.io/en/latest/user_guide/hopsworks/featurestore.html

import airflow

from datetime import datetime, timedelta
from airflow import DAG

from hopsworks_plugin.operators.hopsworks_operator import HopsworksLaunchOperator
from hopsworks_plugin.operators.hopsworks_operator import HopsworksFeatureValidationResult

###################################################
## These variables should be changed accordingly ##
###################################################

# Username in Hopsworks
# Click on Account from the top right drop-down menu
DAG_OWNER = 'meb10000'

# Project name this DAG belongs to
PROJECT_NAME = 'demo_featurestore_admin000'

# Job name as it was generated by Data Validation and shown in Jobs UI
VALIDATION_JOB_NAME = "DV-players_features-v1_1570036411"

# Name of the feature group the validation is running
FEATURE_GROUP_NAME = "players_features"


####################
## DAG definition ##
####################
delta = timedelta(minutes=-10)
now = datetime.now()

args = {
    'owner': DAG_OWNER,
    'depends_on_past': False,

    # DAG should have run 10 minutes before now
    # It will be automatically scheduled to run
    # when we upload the file in Hopsworks
    'start_date': now + delta,

    # Uncomment the following line if you want Airflow
    # to authenticate to Hopsworks using API key
    # instead of JWT
    #
    # NOTE: Edit only YOUR_API_KEY
    #
    #'params': {'hw_api_key': 'YOUR_API_KEY'}
}

# Our DAG
dag = DAG(
    # Arbitrary identifier/name
    dag_id = "players_features_unit_testing",
    default_args = args,

    # Run the DAG only one time
    # It can take Cron like expressions
    # E.x. run every 30 minutes: */30 * * * * 
    schedule_interval = "@once"
)

# First task of DAG. Launch validation job and wait for it to finish
validation = HopsworksLaunchOperator(dag=dag,
                                     project_name=PROJECT_NAME,
                                     # Arbitrary task name
                                     task_id="validation_job-{0}".format(VALIDATION_JOB_NAME),
                                     job_name=VALIDATION_JOB_NAME)

# Second task of DAG. Fetch and evaluate the result
result = HopsworksFeatureValidationResult(dag=dag,
                                          project_name=PROJECT_NAME,
                                          task_id="evaluate_validation_result",
                                          feature_group_name=FEATURE_GROUP_NAME)


# First run task: validation and wait for it to finish
# successfully.
# Then evaluate the result of the validation. If it was
# not successful, fail the DAG
validation >> result
